fix: cross-CN shuffle join INSERT...SELECT hang on multi-CN cluster (#24919)#25158
fix: cross-CN shuffle join INSERT...SELECT hang on multi-CN cluster (#24919)#25158ck89119 wants to merge 4 commits into
Conversation
…atrixorigin#24919) Root cause: newShuffleJoinScopeList leaves a CNs dop join buckets in separate RemoteRun trees while the shuffle dispatch attaches to only the first one. In compileMultiUpdate (used by large INSERT...SELECT), newMergeScope sends each bucket individually -> checkPipeline detects dispatch.LocalRegs pointing to out-of-tree sibling buckets -> converts to local on coordinator -> dispatch runs on wrong CN, mispaired with compile-time cross-CN receiver FromAddr -> remote GetProcByUuid spins / merge WaitingEnd waits forever -> 5M+ rows hang. Fix: add groupShuffleBucketsByCNIfNeeded (gating: multi-CN + cross-CN dispatch present) which merges same-CN shuffle buckets and their nested dispatch into one per-CN send unit via mergeScopesByCN. When the whole group is sent as one RemoteRun unit to its target CN, the dispatch runs on the correct CN, checkPipeline returns true, and the cross-CN receiver handshake completes normally. Noop for single-CN / non-shuffle inserts. Wired into compileInsert and compileMultiUpdate (toWriteS3 + !toWriteS3, 4 call sites). Verified on 2-CN docker cluster: 5Mx2 consecutive runs complete in ~17s with count(*)=5,000,000 and correct data; previously hung forever. Co-Authored-By: Claude <noreply@anthropic.com>
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
- document scopeTreeHasCrossCNDispatch precondition (dispatch is always RootOp) - document that grouping preserves callers existing root operator via AppendChild - extend grouping to the compileInsert S3 sink-scan path: dataScope.MergeRun still sends each bucket individually, so a cross-CN shuffle dispatch there would hit the same convert-to-local hang; group same-CN buckets first Co-Authored-By: Claude <noreply@anthropic.com>
XuPeng-SH
left a comment
There was a problem hiding this comment.
I re-checked the current head and found one substantive regression in the new CN-grouped insert path.
In the non-S3 branch, the new groupShuffleBucketsByCNIfNeeded(ss) call can replace each insert scope with a per-CN Remote merge wrapper. That wrapper’s root operator is just Merge; the real Insert operators remain only in PreScopes.
The execution path does not aggregate affected rows from those grouped child scopes for normal inserts:
newMergeScopeByCN()builds aRemotescope withMergeasRootOpand moves the original insert scopes intoPreScopesrun()special-casesMultiUpdate, but for ordinary grouped insert scopes it still just doesc.addAffectedRows(s.affectedRows())affectedRows()only walks theRootOpchild chain, notPreScopes
So these grouped non-S3 inserts can write data successfully while reporting 0 rows affected.
Concrete suggestions
- Either avoid CN-grouping in the non-S3 insert path, or
- aggregate affected rows recursively from grouped child scopes (the same way
run()already does forMultiUpdate), or - teach
affectedRows()/run()how to includePreScopesfor this grouped-insert container pattern.
I would keep this at request changes because it changes visible DML semantics even when the data write itself succeeds.
…24919 review) XuPeng-SH review: in the compileInsert non-S3 path each bucket carries an Insert as its top-level RootOp. groupShuffleBucketsByCNIfNeeded would move those Insert ops into a per-CN Merge container PreScopes, where run()/affectedRows() (which only walk the RootOp child chain, not PreScopes) miss them -> data written but "0 rows affected". The other grouped paths are unaffected: their write op already sits in PreScopes with a top-level aggregator (mergeblock for write-S3 insert, FlushS3Info for multi-update) that reports affected rows, so grouping only deepens PreScopes without changing the accounting. Non-S3 inserts are small and do not produce cross-CN shuffle, so removing grouping here loses no hang coverage. Co-Authored-By: Claude <noreply@anthropic.com>
…24919 review) Critical: my earlier "skip grouping in non-S3 insert" was wrong. toWriteS3 is decided by INSERT output size (~35K-row threshold) while cross-CN shuffle is decided by JOIN input size + CN count -- independent. A large shuffle join with a highly selective filter / low match rate yields few output rows (non-S3) yet still shuffles across CNs, so the non-S3 path can carry a cross-CN dispatch and would silently hang without grouping. Apply groupShuffleBucketsByCNIfNeeded BEFORE attaching Insert, so the per-CN container gets Insert as its RootOp (Insert -> Merge): checkPipeline returns true (no hang) and affectedRows() still finds the Insert on the RootOp chain (correct row count). Noop when no cross-CN dispatch is present. Also from review: - scopeTreeHasCrossCNDispatch: document the intentional narrower scope vs checkPipelineStandaloneExecutableAtRemote (dispatch only, not connector). - isSameCN: log malformed-address fallback at Warn (was Debug) so a wrong-CN merge is visible in ops. Co-Authored-By: Claude <noreply@anthropic.com>
What type of PR is this?
Which issue(s) this PR fixes:
issue #24919
What this PR does / why we need it:
Fixes a silent deadlock where
INSERT ... SELECTwith cross-CN shuffle hash join (shuffle: range(...)) over large tables (>=5M rows/table) hangs forever on multi-CN clusters. The same statement completes in ~3-6s on a single-node deployment.Root cause
newShuffleJoinScopeListleaves a CN'sdopjoin buckets as independentRemoteRuntrees, while the shuffle dispatch attaches to only the first bucket. WhencompileMultiUpdate(used by largeINSERT...SELECT) sends each bucket individually vianewMergeScope:checkPipelineStandaloneExecutableAtRemotedetects the dispatch'sLocalRegspointing to out-of-tree sibling buckets → correctly returns falseRemoteRunconverts the pipeline to local on the coordinatorFromAddrGetProcByUuidendlessly busy-spin (300s timeout) → idleWaitingEnddeadlock (uncancellablecontext.TODO())Fix
Add
groupShuffleBucketsByCNIfNeeded(gated: multi-CN + cross-CN dispatch subtree present) which reusesmergeScopesByCNto group same-CN shuffle buckets — together with their nested dispatch — into one per-CN send unit. When the whole group is sent via a singleRemoteRunto its target CN:checkPipelineStandaloneExecutableAtRemotereturnstrue(all dispatchLocalRegsare within the same tree)RemoteRunprotocol, dispatch operator, orcheckPipelinelogicThe grouping is a no-op for single-CN / non-shuffle inserts (confirmed by gating UT).
Changes (2 files, +171 lines)
pkg/sql/compile/compile.go (+62):groupShuffleBucketsByCNIfNeeded+scopeTreeHasCrossCNDispatchhelpers, wired intocompileInsertandcompileMultiUpdate(4 call sites:toWriteS3and!toWriteS3paths)pkg/sql/compile/remoterun_test.go (+109): UT reproducing thecheckPipeline=falsepre-fix + verifying per-CN containers returntruepost-fix + gating test (no regressions for single-CN / non-shuffle)Verification
etc/docker-multi-cn-local-disk): 5M rows × 2 consecutive runs complete in ~17s each,count(*)=5,000,000, data fully correct (distinct_id=5,000,000,bad_pad=0,bad_k=0). Previously hung forever.-racepass; new helper coverage 100%;go vetclean;go build ./pkg/sql/...passes🤖 Generated with Claude Code